"""
PGVector adapter implementation for VectorStoreAdapter.

Notes:
- Uses psycopg/psycopg2 when available; imports deferred until initialize() to avoid hard dependency at import time.
- Stores each logical collection in a separate table named vs_<sanitized_collection> with a vector column.
- Requires pgvector extension installed in the target database.
"""
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
from prometheus_client import Histogram, Counter
import asyncio
import re

from .base import VectorStoreAdapter, VectorStoreConfig, VectorSearchResult

try:
    from pgvector.psycopg import register_vector as _register_pgvector, Vector as _PgVector
except Exception:  # pragma: no cover - optional dependency
    _register_pgvector = None
    _PgVector = None


class PGVectorAdapter(VectorStoreAdapter):
    # Prometheus metrics (module-level singletons per process)
    _H_UPSERT_LAT = Histogram(
        "pgvector_upsert_latency_seconds",
        "Latency for pgvector upsert operations",
        ["collection"],
        buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, float("inf")),
    )
    _H_QUERY_LAT = Histogram(
        "pgvector_query_latency_seconds",
        "Latency for pgvector search queries",
        ["collection"],
        buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, float("inf")),
    )
    _H_DELETE_LAT = Histogram(
        "pgvector_delete_latency_seconds",
        "Latency for pgvector delete operations",
        ["collection"],
    )
    _C_ROWS_UPSERTED = Counter(
        "pgvector_rows_upserted_total",
        "Rows upserted into pgvector",
        ["collection"],
    )
    _C_ROWS_DELETED = Counter(
        "pgvector_rows_deleted_total",
        "Rows deleted from pgvector",
        ["collection"],
    )
    def __init__(self, config: VectorStoreConfig):
        super().__init__(config)
        self._conn = None  # Single connection fallback
        self._pool = None  # psycopg_pool.ConnectionPool when available
        self._driver = None  # 'psycopg' or 'psycopg2'
        self._ef_search = int(self.config.connection_params.get('hnsw_ef_search', 64))
        self._vector_cls = None  # pgvector.Vector when available

    async def initialize(self) -> None:
        if self._initialized:
            return
        try:
            dsn = self._build_dsn(self.config.connection_params)
            # Prefer psycopg v3 with pooling
            try:
                import psycopg
                try:
                    from psycopg_pool import ConnectionPool  # type: ignore
                    self._pool = ConnectionPool(dsn=dsn, min_size=1, max_size=int(self.config.connection_params.get('pool_size', 5)))
                    self._driver = 'psycopg_pool'
                except Exception:
                    # Fallback to single psycopg connection
                    self._conn = await asyncio.get_event_loop().run_in_executor(
                        None,
                        lambda: psycopg.connect(dsn)  # type: ignore
                    )
                    self._driver = 'psycopg'
            except Exception:
                # Final fallback: psycopg2 single connection
                import psycopg2  # type: ignore
                self._conn = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: psycopg2.connect(dsn)
                )
                self._driver = 'psycopg2'

            await self._register_vector_support()

            # Ensure pgvector extension
            await self._exec("CREATE EXTENSION IF NOT EXISTS vector")
            self._initialized = True
            logger.info("PGVector adapter initialized")
        except Exception as e:
            logger.error(f"Failed to initialize PGVector adapter: {e}")
            self._conn = None
            self._pool = None
            self._initialized = False

    def _build_dsn(self, params: Dict[str, Any]) -> str:
        # Support both DSN and discrete params
        if params.get('dsn'):
            return str(params['dsn'])
        host = params.get('host', 'localhost')
        port = params.get('port', 5432)
        db = params.get('database', 'postgres')
        user = params.get('user', 'postgres')
        password = params.get('password', '')
        sslmode = params.get('sslmode', 'prefer')
        return f"host={host} port={port} dbname={db} user={user} password={password} sslmode={sslmode}"

    def _sanitize_collection(self, name: str) -> str:
        # Allow only alphanum and underscores; replace others with underscore
        safe = re.sub(r"[^A-Za-z0-9_]+", "_", name)
        return f"vs_{safe}"

    def _borrow_conn(self):
        if self._pool is not None:
            return self._pool.connection()
        if self._conn is not None:
            class _Ctx:
                def __init__(self, conn): self.conn = conn
                def __enter__(self): return self.conn
                def __exit__(self, exc_type, exc, tb): return False
            return _Ctx(self._conn)
        raise RuntimeError("PGVector connection not initialized")

    async def _register_vector_support(self) -> None:
        """Register pgvector adapters with psycopg when available."""
        if self._vector_cls is not None:
            return
        if _register_pgvector is None or _PgVector is None:
            return
        loop = asyncio.get_event_loop()
        try:
            if self._pool is not None:
                await loop.run_in_executor(None, _register_pgvector, self._pool)
            elif self._conn is not None:
                await loop.run_in_executor(None, _register_pgvector, self._conn)
            else:
                return
            self._vector_cls = _PgVector
            logger.debug("Registered pgvector type with psycopg")
        except Exception as exc:  # pragma: no cover - registration best-effort
            logger.debug(f"pgvector registration failed: {exc}")
            self._vector_cls = None

    def _serialize_vector(self, vector: List[float]) -> str:
        """Serialize a python list into a pgvector literal."""
        if self._vector_cls is not None and isinstance(vector, self._vector_cls):  # type: ignore[arg-type]
            vector = list(vector)
        if not isinstance(vector, (list, tuple)):
            raise TypeError("query_vector must be a sequence of floats")
        parts = []
        for val in vector:
            try:
                parts.append(format(float(val), ".15g"))
            except Exception as exc:
                raise TypeError("query_vector must contain numbers") from exc
        return "[" + ",".join(parts) + "]"

    async def _exec(self, sql: str, params: Optional[tuple] = None) -> None:
        def _run(pool, single, ef):
            ctx = pool if pool is not None else single
            with ctx as conn:
                cur = conn.cursor()
                try:
                    try:
                        cur.execute(f"SET hnsw.ef_search = {int(ef)}")
                    except Exception as e:
                        logger.debug("pgvector._exec: SET hnsw.ef_search failed", exc_info=e)
                    cur.execute(sql, params or ())
                    conn.commit()
                except Exception as e:
                    try:
                        conn.rollback()
                    except Exception as rb_e:
                        logger.debug("pgvector._exec: rollback failed", exc_info=rb_e)
                    raise
                finally:
                    try:
                        cur.close()
                    except Exception as e:
                        logger.debug("pgvector._exec: cursor close failed", exc_info=e)
        await asyncio.get_event_loop().run_in_executor(
            None,
            _run,
            self._borrow_conn(),
            None if self._pool else self._borrow_conn(),
            self._ef_search,
        )

    async def _query(self, sql: str, params: Optional[tuple] = None) -> List[tuple]:
        def _run(pool, single, ef):
            ctx = pool if pool is not None else single
            with ctx as conn:
                cur = conn.cursor()
                try:
                    try:
                        cur.execute(f"SET hnsw.ef_search = {int(ef)}")
                    except Exception as e:
                        logger.debug("pgvector._query: SET hnsw.ef_search failed", exc_info=e)
                    cur.execute(sql, params or ())
                    rows = cur.fetchall()
                    return rows
                except Exception as e:
                    try:
                        conn.rollback()
                    except Exception as rb_e:
                        logger.debug("pgvector._query: rollback failed", exc_info=rb_e)
                    raise
                finally:
                    try:
                        cur.close()
                    except Exception as e:
                        logger.debug("pgvector._query: cursor close failed", exc_info=e)
        return await asyncio.get_event_loop().run_in_executor(
            None,
            _run,
            self._borrow_conn(),
            None if self._pool else self._borrow_conn(),
            self._ef_search,
        )

    async def create_collection(self, collection_name: str, metadata: Optional[Dict[str, Any]] = None) -> None:
        tbl = self._sanitize_collection(collection_name)
        dim = int(self.config.embedding_dim)
        metric = self.config.distance_metric or 'cosine'
        # Use ivfflat/hnsw only if configured by DBA; here we keep a basic table
        sql = (
            f"CREATE TABLE IF NOT EXISTS {tbl} ("
            "id TEXT PRIMARY KEY, "
            "content TEXT, "
            "metadata JSONB, "
            f"embedding vector({dim})"
            ")"
        )
        await self._exec(sql)
        # Attempt HNSW index first (pgvector >= 0.7); fallback to IVFFLAT on failure
        ops = 'vector_cosine_ops' if metric == 'cosine' else ('vector_l2_ops' if metric == 'euclidean' else 'vector_ip_ops')
        try:
            await self._exec(
                f"CREATE INDEX IF NOT EXISTS {tbl}_embedding_hnsw ON {tbl} USING hnsw (embedding {ops}) WITH (m=16, ef_construction=200)"
            )
        except Exception:
            try:
                await self._exec(
                    f"CREATE INDEX IF NOT EXISTS {tbl}_embedding_ivf ON {tbl} USING ivfflat (embedding {ops})"
                )
            except Exception:
                # If both fail, continue without an ANN index (still usable for brute-force)
                pass
        # Analyze to help planner (best-effort)
        try:
            await self._exec(f"ANALYZE {tbl}")
        except Exception:
            pass

    async def delete_collection(self, collection_name: str) -> None:
        tbl = self._sanitize_collection(collection_name)
        await self._exec(f"DROP TABLE IF EXISTS {tbl}")

    async def list_collections(self) -> List[str]:
        sql = "SELECT tablename FROM pg_tables WHERE tablename LIKE %s"
        rows = await self._query(sql, ('vs_%',))
        collections = []
        for (name,) in rows:
            if isinstance(name, str) and name.startswith("vs_"):
                collections.append(name[3:])
            else:
                collections.append(str(name))
        return collections

    async def upsert_vectors(
        self,
        collection_name: str,
        ids: List[str],
        vectors: List[List[float]],
        documents: List[str],
        metadatas: List[Dict[str, Any]]
    ) -> None:
        self._validate_vectors(vectors)
        tbl = self._sanitize_collection(collection_name)
        values = list(zip(ids, documents, metadatas, vectors))
        # Use simple upsert
        def _batch(pool, single, ef):
            ctx = pool if pool is not None else single
            with ctx as conn:
                cur = conn.cursor()
                try:
                    try:
                        cur.execute(f"SET hnsw.ef_search = {int(ef)}")
                    except Exception as e:
                        logger.debug("pgvector.upsert: SET hnsw.ef_search failed", exc_info=e)
                    args = [(_id, doc, JsonDumper.dumps(meta), vec) for _id, doc, meta, vec in values]
                    cur.executemany(
                        f"INSERT INTO {tbl}(id, content, metadata, embedding) VALUES (%s, %s, %s, %s) "
                        f"ON CONFLICT (id) DO UPDATE SET content=EXCLUDED.content, metadata=EXCLUDED.metadata, embedding=EXCLUDED.embedding",
                        args,
                    )
                    conn.commit()
                finally:
                    try:
                        cur.close()
                    except Exception as e:
                        logger.debug("pgvector.upsert: cursor close failed", exc_info=e)
        # Observe rows + latency
        with self._H_UPSERT_LAT.labels(collection=tbl).time():
            await asyncio.get_event_loop().run_in_executor(None, _batch, self._borrow_conn(), None if self._pool else self._borrow_conn(), self._ef_search)
        try:
            self._C_ROWS_UPSERTED.labels(collection=tbl).inc(len(values))
        except Exception as e:
            logger.debug("pgvector.upsert: metrics increment failed", exc_info=e)

    async def delete_vectors(self, collection_name: str, ids: List[str]) -> None:
        tbl = self._sanitize_collection(collection_name)
        def _batch(pool, single, ef):
            ctx = pool if pool is not None else single
            with ctx as conn:
                cur = conn.cursor()
                try:
                    cur.executemany(f"DELETE FROM {tbl} WHERE id=%s", [(i,) for i in ids])
                    conn.commit()
                    rc = getattr(cur, 'rowcount', 0)
                    return int(rc) if rc is not None else 0
                finally:
                    try:
                        cur.close()
                    except Exception as e:
                        logger.debug("pgvector.delete_vectors: cursor close failed", exc_info=e)
        with self._H_DELETE_LAT.labels(collection=tbl).time():
            rc = await asyncio.get_event_loop().run_in_executor(None, _batch, self._borrow_conn(), None if self._pool else self._borrow_conn(), self._ef_search)
        try:
            self._C_ROWS_DELETED.labels(collection=tbl).inc(int(rc))
        except Exception as e:
            logger.debug("pgvector.delete_vectors: metrics increment failed", exc_info=e)

    async def delete_by_filter(self, collection_name: str, filter: Dict[str, Any]) -> int:
        """Delete rows matching a JSONB metadata filter; returns affected row count."""
        tbl = self._sanitize_collection(collection_name)
        if filter and isinstance(filter, dict) and len(filter) > 0:
            where_sql, params = self._build_where_from_filter(filter)
        else:
            # No-op when filter is empty
            return 0
        def _run(pool, single, ef):
            ctx = pool if pool is not None else single
            with ctx as conn:
                cur = conn.cursor()
                try:
                    try:
                        cur.execute(f"SET hnsw.ef_search = {int(ef)}")
                    except Exception as e:
                        logger.debug("pgvector.delete_by_filter: SET hnsw.ef_search failed", exc_info=e)
                    cur.execute(f"DELETE FROM {tbl}{where_sql}", tuple(params))
                    rc = getattr(cur, 'rowcount', 0)
                    conn.commit()
                    return int(rc) if rc is not None else 0
                finally:
                    try:
                        cur.close()
                    except Exception as e:
                        logger.debug("pgvector.delete_by_filter: cursor close failed", exc_info=e)
        with self._H_DELETE_LAT.labels(collection=tbl).time():
            rc = await asyncio.get_event_loop().run_in_executor(None, _run, self._borrow_conn(), None if self._pool else self._borrow_conn(), self._ef_search)
        try:
            self._C_ROWS_DELETED.labels(collection=tbl).inc(int(rc or 0))
        except Exception as e:
            logger.debug("pgvector.delete_by_filter: metrics increment failed", exc_info=e)
        try:
            return int(rc)
        except Exception:
            return 0

    # Adapter-specific helper: list vectors with pagination
    def _build_where_from_filter(self, filt: Dict[str, Any]) -> Tuple[str, List[Any]]:
        # Prefer JSON containment for simple equality maps (no operators, no nested dict/list values)
        if (
            isinstance(filt, dict)
            and len(filt) > 0
            and all(not isinstance(v, (dict, list, tuple)) for v in filt.values())
            and all(not str(k).startswith('$') for k in filt.keys())
        ):
            import json as _json
            return ' WHERE metadata @> %s', [_json.dumps(filt)]

        # Fallback: build explicit predicates (supports $and/$or and operators)
        def handle_node(node) -> Tuple[List[str], List[Any]]:
            if not isinstance(node, dict):
                return [], []
            local_clauses: List[str] = []
            local_params: List[Any] = []
            for k, v in node.items():
                if k == '$and' and isinstance(v, list):
                    sub_parts = [handle_node(x) for x in v]
                    sub_sql = [f"({ ' AND '.join(p[0]) })" for p in sub_parts if p[0]]
                    sub_params: List[Any] = []
                    for p in sub_parts:
                        sub_params.extend(p[1])
                    if sub_sql:
                        local_clauses.append(' AND '.join(sub_sql))
                        local_params.extend(sub_params)
                elif k == '$or' and isinstance(v, list):
                    sub_parts = [handle_node(x) for x in v]
                    sub_sql = [f"({ ' AND '.join(p[0]) })" for p in sub_parts if p[0]]
                    sub_params: List[Any] = []
                    for p in sub_parts:
                        sub_params.extend(p[1])
                    if sub_sql:
                        local_clauses.append(' OR '.join(sub_sql))
                        local_params.extend(sub_params)
                else:
                    field = str(k)
                    if isinstance(v, dict):
                        for op, val in v.items():
                            if op in ('$eq', 'eq'):
                                local_clauses.append("(metadata->>%s) = %s")
                                local_params.extend([field, str(val)])
                            elif op in ('$neq', 'neq'):
                                local_clauses.append("(metadata->>%s) <> %s")
                                local_params.extend([field, str(val)])
                            elif op in ('$in', 'in') and isinstance(val, (list, tuple)) and val:
                                # Use ANY(array) to safely parametrize lists
                                local_clauses.append("(metadata->>%s) = ANY(%s)")
                                local_params.extend([field, list(map(str, val))])
                            elif op in ('$gt', '$gte', '$lt', '$lte'):
                                cmp = {'$gt': '>', '$gte': '>=', '$lt': '<', '$lte': '<='}[op]
                                local_clauses.append(f"(metadata->>%s)::numeric {cmp} %s")
                                local_params.extend([field, float(val)])
                            else:
                                local_clauses.append("(metadata->>%s) = %s")
                                local_params.extend([field, str(val)])
                    else:
                        local_clauses.append("(metadata->>%s) = %s")
                        local_params.extend([field, str(v)])
            return local_clauses, local_params

        clauses, params = handle_node(filt)
        if clauses:
            return ' WHERE ' + ' AND '.join(clauses), params
        return '', []

    async def list_vectors_paginated(self, collection_name: str, limit: int, offset: int, filter: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, order_dir: str = 'asc') -> Dict[str, Any]:
        tbl = self._sanitize_collection(collection_name)
        if filter and isinstance(filter, dict) and len(filter) > 0:
            where_sql, params = self._build_where_from_filter(filter)
        else:
            where_sql, params = '', []
        ob = 'id'
        if order_by and isinstance(order_by, str):
            if order_by.startswith('metadata.'):
                key = order_by.split('.', 1)[1]
                ob = f"(metadata->>'{key}')"
            elif order_by == 'id':
                ob = 'id'
        odir = 'ASC' if str(order_dir).lower() == 'asc' else 'DESC'
        rows = await self._query(
            f"SELECT id, content, metadata FROM {tbl}{where_sql} ORDER BY {ob} {odir} LIMIT %s OFFSET %s",
            tuple(params + [int(limit), int(offset)]),
        )
        items = []
        for rid, content, metadata in rows:
            items.append({
                'id': str(rid),
                'content': content or '',
                'metadata': metadata if isinstance(metadata, dict) else {},
            })
        if where_sql:
            cnt_rows = await self._query(f"SELECT COUNT(*) FROM {tbl}{where_sql}", tuple(params))
        else:
            cnt_rows = await self._query(f"SELECT COUNT(*) FROM {tbl}")
        total = int(cnt_rows[0][0]) if cnt_rows else 0
        return {'items': items, 'total': total}

    # Adapter-specific helper: list vectors including embeddings for duplication
    async def list_vectors_with_embeddings_paginated(self, collection_name: str, limit: int, offset: int, filter: Optional[Dict[str, Any]] = None, order_by: Optional[str] = None, order_dir: str = 'asc') -> Dict[str, Any]:
        tbl = self._sanitize_collection(collection_name)
        if filter and isinstance(filter, dict) and len(filter) > 0:
            where_sql, params = self._build_where_from_filter(filter)
        else:
            where_sql, params = '', []
        ob = 'id'
        if order_by and isinstance(order_by, str):
            if order_by.startswith('metadata.'):
                key = order_by.split('.', 1)[1]
                ob = f"(metadata->>'{key}')"
            elif order_by == 'id':
                ob = 'id'
        odir = 'ASC' if str(order_dir).lower() == 'asc' else 'DESC'
        rows = await self._query(
            f"SELECT id, content, metadata, embedding FROM {tbl}{where_sql} ORDER BY {ob} {odir} LIMIT %s OFFSET %s",
            tuple(params + [int(limit), int(offset)]),
        )
        items = []
        for rid, content, metadata, embedding in rows:
            vec = embedding
            try:
                if hasattr(vec, 'tolist'):
                    vec = vec.tolist()
                elif isinstance(vec, tuple):
                    vec = list(vec)
            except Exception:
                pass
            items.append({
                'id': str(rid),
                'content': content or '',
                'metadata': metadata if isinstance(metadata, dict) else {},
                'vector': vec if isinstance(vec, list) else [],
            })
        if where_sql:
            cnt_rows = await self._query(f"SELECT COUNT(*) FROM {tbl}{where_sql}", tuple(params))
        else:
            cnt_rows = await self._query(f"SELECT COUNT(*) FROM {tbl}")
        total = int(cnt_rows[0][0]) if cnt_rows else 0
        return {'items': items, 'total': total}

    async def get_index_info(self, collection_name: str) -> Dict[str, Any]:
        tbl = self._sanitize_collection(collection_name)
        rows = await self._query(
            "SELECT indexname, indexdef FROM pg_indexes WHERE tablename = %s",
            (tbl,),
        )
        index_type = 'none'
        index_name = None
        ops = None
        for name, definition in rows:
            defn = str(definition or '').lower()
            if ' using hnsw ' in defn and 'embedding' in defn:
                index_type = 'hnsw'
                index_name = name
                if 'vector_cosine_ops' in defn:
                    ops = 'vector_cosine_ops'
                elif 'vector_l2_ops' in defn:
                    ops = 'vector_l2_ops'
                elif 'vector_ip_ops' in defn:
                    ops = 'vector_ip_ops'
                break
            if ' using ivfflat ' in defn and 'embedding' in defn:
                index_type = 'ivfflat'
                index_name = name
                if 'vector_cosine_ops' in defn:
                    ops = 'vector_cosine_ops'
                elif 'vector_l2_ops' in defn:
                    ops = 'vector_l2_ops'
                elif 'vector_ip_ops' in defn:
                    ops = 'vector_ip_ops'
                # keep checking in case hnsw exists
        return {
            'table': tbl,
            'index_type': index_type,
            'index_name': index_name,
            'ops': ops,
            'dimension': self.config.embedding_dim,
            'metric': self.config.distance_metric,
            'ef_search': self._ef_search,
        }

    def set_ef_search(self, value: int) -> int:
        try:
            self._ef_search = max(1, int(value))
        except Exception:
            pass
        return self._ef_search

    async def rebuild_index(
        self,
        collection_name: str,
        index_type: str = 'hnsw',
        metric: Optional[str] = None,
        m: int = 16,
        ef_construction: int = 200,
        lists: int = 100
    ) -> Dict[str, Any]:
        """Drop existing ANN index on embedding and create the specified one.

        index_type: 'hnsw' | 'ivfflat' | 'drop'
        metric: 'cosine' | 'euclidean' | 'ip' (defaults to adapter metric)
        """
        tbl = self._sanitize_collection(collection_name)
        # Drop existing embedding indexes
        rows = await self._query(
            "SELECT indexname FROM pg_indexes WHERE tablename = %s",
            (tbl,),
        )
        for (name,) in rows:
            try:
                # Fetch index definition to verify it's on embedding
                defrows = await self._query("SELECT indexdef FROM pg_indexes WHERE indexname = %s", (name,))
                if defrows and 'embedding' in (defrows[0][0] or '').lower():
                    await self._exec(f"DROP INDEX IF EXISTS \"{name}\"")
            except Exception:
                # Continue dropping best-effort
                pass

        if index_type.lower() == 'drop':
            try:
                await self._exec(f"ANALYZE {tbl}")
            except Exception:
                pass
            return await self.get_index_info(collection_name)

        op_metric = (metric or self.config.distance_metric or 'cosine').lower()
        ops = 'vector_cosine_ops' if op_metric == 'cosine' else ('vector_l2_ops' if op_metric in ('euclidean','l2') else 'vector_ip_ops')
        if index_type.lower() == 'hnsw':
            await self._exec(
                f"CREATE INDEX IF NOT EXISTS {tbl}_embedding_hnsw ON {tbl} USING hnsw (embedding {ops}) WITH (m={int(m)}, ef_construction={int(ef_construction)})"
            )
        elif index_type.lower() == 'ivfflat':
            await self._exec(
                f"CREATE INDEX IF NOT EXISTS {tbl}_embedding_ivf ON {tbl} USING ivfflat (embedding {ops}) WITH (lists={int(lists)})"
            )
        else:
            raise ValueError("index_type must be one of: hnsw, ivfflat, drop")

        try:
            await self._exec(f"ANALYZE {tbl}")
        except Exception:
            pass
        return await self.get_index_info(collection_name)

    # Adapter-specific helper: get a single vector by id
    async def get_vector(self, collection_name: str, vector_id: str) -> Optional[Dict[str, Any]]:
        tbl = self._sanitize_collection(collection_name)
        rows = await self._query(
            f"SELECT id, content, metadata FROM {tbl} WHERE id=%s",
            (vector_id,),
        )
        if not rows:
            return None
        rid, content, metadata = rows[0]
        return {
            'id': str(rid),
            'content': content or '',
            'metadata': metadata if isinstance(metadata, dict) else {},
        }

    async def search(
        self,
        collection_name: str,
        query_vector: List[float],
        k: int = 10,
        filter: Optional[Dict[str, Any]] = None,
        include_metadata: bool = True
    ) -> List[VectorSearchResult]:
        tbl = self._sanitize_collection(collection_name)
        metric = self.config.distance_metric or 'cosine'
        use_native_vector = self._vector_cls is not None
        # Build distance expression
        if metric == 'cosine':
            op = "<=>"
        elif metric == 'euclidean':
            op = "<->"
        else:
            op = "<#>"
        placeholder = "%s" if use_native_vector else "%s::vector"
        dist_expr = f"embedding {op} {placeholder}"
        sql = f"SELECT id, content, metadata, {dist_expr} AS distance FROM {tbl}"
        # Build WHERE using rich filter support (equality, $and/$or, $in, numeric cmp)
        vector_param: Any
        if use_native_vector:
            if isinstance(query_vector, self._vector_cls):  # type: ignore[arg-type]
                vector_param = query_vector
            else:
                vector_param = self._vector_cls(query_vector)  # type: ignore[call-arg]
        else:
            vector_param = self._serialize_vector(query_vector)
        params: List[Any] = [vector_param]
        if filter and isinstance(filter, dict) and len(filter) > 0:
            where_sql, where_params = self._build_where_from_filter(filter)
            sql += where_sql
            params.extend(where_params)
        sql += " ORDER BY distance ASC LIMIT %s"
        params.append(int(k))
        with self._H_QUERY_LAT.labels(collection=tbl).time():
            rows = await self._query(sql, tuple(params))
        results: List[VectorSearchResult] = []
        for rid, content, metadata, distance in rows:
            # Convert distance to similarity in [0,1] by heuristic
            try:
                sim = 1.0 / (1.0 + float(distance))
            except Exception:
                sim = 0.0
            results.append(VectorSearchResult(
                id=str(rid),
                content=content or "",
                metadata=metadata if include_metadata and isinstance(metadata, dict) else {},
                score=sim,
                distance=float(distance) if distance is not None else 0.0,
            ))
        return results

    async def multi_search(
        self,
        collection_patterns: List[str],
        query_vector: List[float],
        k: int = 10,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[VectorSearchResult]:
        # Fetch matching tables and aggregate results
        all_tables = await self.list_collections()
        results: List[VectorSearchResult] = []
        for pattern in collection_patterns:
            regex = re.compile('^' + pattern.replace('*', '.*') + '$')
            for tbl in all_tables:
                if regex.match(tbl):
                    results.extend(await self.search(tbl, query_vector, k=k, filter=filter))
        # Sort by score desc and trim
        results.sort(key=lambda r: r.score, reverse=True)
        return results[:k]

    async def get_collection_stats(self, collection_name: str) -> Dict[str, Any]:
        tbl = self._sanitize_collection(collection_name)
        rows = await self._query(f"SELECT COUNT(*) FROM {tbl}")
        count = int(rows[0][0]) if rows else 0
        return {
            "collection": collection_name,
            "table": tbl,
            "count": count,
            "dim": self.config.embedding_dim,
            "dimension": self.config.embedding_dim,
            "metric": self.config.distance_metric,
        }

    async def optimize_collection(self, collection_name: str) -> None:
        # VACUUM/ANALYZE basic optimization
        tbl = self._sanitize_collection(collection_name)
        try:
            await self._exec(f"ANALYZE {tbl}")
        except Exception:
            pass

    async def get_index_info(self, collection_name: str) -> Dict[str, Any]:
        tbl = self._sanitize_collection(collection_name)
        # Identify index type on embedding column
        try:
            rows = await self._query("SELECT indexdef FROM pg_indexes WHERE tablename = %s", (tbl,))
            idxdef = " ".join([(r[0] or "") for r in rows])
            idx_type = "hnsw" if "using hnsw" in idxdef.lower() else ("ivfflat" if "using ivfflat" in idxdef.lower() else "none")
        except Exception:
            idx_type = "unknown"
        stats = await self.get_collection_stats(collection_name)
        return {
            "backend": "pgvector",
            "index_type": idx_type,
            "dimension": stats.get("dimension", self.config.embedding_dim),
            "count": stats.get("count", 0),
            "ops": "vector_%s_ops" % ((self.config.distance_metric or 'cosine')),
            "ef_search": self._ef_search,
        }

    async def close(self) -> None:
        # Close pooled connections first (if any), then single connection fallback
        try:
            if self._pool is not None:
                # psycopg_pool.ConnectionPool exposes close(); run in executor to avoid blocking
                await asyncio.get_event_loop().run_in_executor(None, getattr(self._pool, "close", lambda: None))
        except Exception:
            pass
        finally:
            self._pool = None
        try:
            if self._conn:
                await asyncio.get_event_loop().run_in_executor(None, self._conn.close)
        except Exception:
            pass
        finally:
            self._conn = None
        await super().close()

    async def health(self) -> Dict[str, Any]:
        ok = False
        info: Dict[str, Any] = {"driver": self._driver or "unknown"}
        # Include basic pool stats when psycopg_pool is available
        try:
            if self._pool is not None:
                # Guarded getattr to avoid hard dependency on psycopg_pool internals
                info["pool"] = {
                    "min_size": getattr(self._pool, "min_size", None),
                    "max_size": getattr(self._pool, "max_size", None),
                    "num_connections": getattr(self._pool, "num_connections", None),
                    "num_available": getattr(self._pool, "num_available", None),
                }
        except Exception:
            pass
        try:
            rows = await self._query("SELECT 1", None)
            ok = bool(rows)
        except Exception:
            try:
                await self.initialize()
                rows2 = await self._query("SELECT 1", None)
                ok = bool(rows2)
            except Exception:
                ok = False
        info["ok"] = bool(ok)
        return info


class JsonDumper:
    @staticmethod
    def dumps(obj: Dict[str, Any]) -> str:
        # Avoid importing json at module top as a micro-optimization
        import json as _json
        try:
            return _json.dumps(obj)
        except Exception:
            return '{}'
